import json
import os
import subprocess
import asyncio
from clogger import logger
import tempfile
import ast
from typing import List, Type, Optional
from apps.task.models import JobModel
from importlib import import_module
from django.conf import settings
from lib.utils import uuid_8
from asgiref.sync import sync_to_async, async_to_sync
from channel_job.job import default_channel_job_executor, JobResult
from service_scripts.base import (
    DiagnosisJob,
    DiagnosisJobResult,
    DiagnosisTask,
    DiagnosisTaskResult,
    DiagnosisPreProcessor,
    PostProcessResult,
    DiagnosisPostProcessor,
    DiagnosisHookProcessor,
    HookProcessResult,
)
from service_scripts.wrapper.base import DiagnosisPreProcessorPostWrapperBase


class DiagnosisHelper:
    """A helper class used to perform diagnosis task"""

    @staticmethod
    def init(data: dict, user: dict) -> JobModel:
        """Some params check, and create one new task with Ready status"""
        user_id = user["id"]
        params = data.copy()
        service_name = data.pop("service_name", None)
        task_id = uuid_8()

        for k, v in params.items():
            if isinstance(v, str):
                params[k] = v.strip()

        # 1. Determines if there is a task with the same parameters
        #    and a status of Running.
        if (
            JobModel.objects.filter(
                status__in=["Ready", "Running"],
                service_name=service_name,
                params=params,
            ).first()
            is not None
        ):
            raise Exception(
                f"node:{data.get('instance', '')}, There are tasks in progress, {service_name}"
            )

        # 2. Create a task with Ready status
        task_params = {
            "command": "",
            "task_id": task_id,
            "created_by": user_id,
            "params": params,
            "service_name": service_name,
            "status": "Ready",
        }
        return JobModel.objects.create(**task_params)

    @staticmethod
    def offline_import(data: dict, user: dict) -> JobModel:
        """Import offline diagnosis logs as a Job"""
        user_id = user["id"]
        task_id = uuid_8()
        service_name = data.get("service_name", None)
        if "channel" not in data:
            data["channel"] = "offline"
        task_params = {
            "command": "",
            "task_id": task_id,
            "created_by": user_id,
            "params": data,
            "service_name": service_name,
            "status": "Ready"
        }
        return JobModel.objects.create(**task_params)

    #################################################################################################
    # Sync
    #################################################################################################

    @staticmethod
    def run_subprocess(cmd: List[str]) -> dict:
        resp = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return {
            "stdout": resp.stdout.decode("utf-8"),
            "stderr": resp.stderr.decode("utf-8"),
            "returncode": resp.returncode,
        }

    @staticmethod
    def preprocess(
        instance: JobModel, ignore_check: bool = False
    ) -> Optional[DiagnosisTask]:
        """ "Perform diagnosis preprocessing
        {
            "commands":[
                {
                    "instance":"xxx",
                    "cmd":"xxx",
                    "params":{ => overide initial param
                        "region":"target_region"
                    }
                }
            ]
        }
        """
        return async_to_sync(DiagnosisHelper.preprocess_async)(instance, ignore_check)

    @staticmethod
    async def execute(
        instance: JobModel, diagnosis_task: DiagnosisTask
    ) -> DiagnosisTaskResult:
        """Execute diagnosis task"""
        return async_to_sync(DiagnosisHelper.execute)(instance, diagnosis_task)

    @staticmethod
    def postprocess(instance: JobModel, diagnosis_task_result: DiagnosisTaskResult):
        """Perform diagnosis postprocessing
        JobResult -> {
            "code": 0,          => 0 表示成功，1表示失败
            "err_msg": "",      => 如果任务执行失败，错误信息可以使用这个字段获得
            "result": "xxx",    => 命令执行结果
            "echo": {
                "task_id": 0
            }
        }

        Postprocess script response data format =>
        {
            "code": 0,          => 0 表示成功，1表示失败
            "err_msg": "",      => 如果后处理脚本检测到诊断失败，在这边存放诊断错误信息
            "result": {}        => 后处理脚本处理的结果，应该是一个 JSON Object
        }
        """
        return async_to_sync(DiagnosisHelper.postprocess_async)(
            instance, diagnosis_task_result
        )

    #################################################################################################
    # Async
    #################################################################################################

    @staticmethod
    async def _update_job_async(instance: JobModel, **kwargs):
        """Update JobModel object"""
        try:
            if isinstance(kwargs.get("result", ""), dict):
                kwargs["result"] = json.dumps(kwargs.get("result", ""))
            instance.__dict__.update(**kwargs)
            await sync_to_async(instance.save)()
        except Exception as e:
            raise e

    @staticmethod
    async def run_subprocess_async(cmd: List[str]) -> dict:
        return DiagnosisHelper.run_subprocess(cmd)

    @staticmethod
    async def preprocess_v1_async(
        instance: JobModel, ignore_check: bool = False
    ) -> DiagnosisTask:
        """ "Perform diagnosis preprocessing

        {
            "commands":[
                {
                    "instance":"xxx",
                    "cmd":"xxx",
                    "params":{ => overide initial param
                        "region":"target_region"
                    }
                }
            ]
        }
        """
        service_name = instance.service_name
        params = instance.params.copy()
        if isinstance(params, str):
            try:
                params = json.loads(params)
            except Exception as exc:
                logger.exception(f"Task params loads error: {str(exc)}")

        # 2. Invoke preprocessing script（preprocessing script）
        SCRIPTS_DIR = settings.SCRIPTS_DIR
        service_path = os.path.join(SCRIPTS_DIR, service_name)

        # 防止任意命令执行
        if os.path.dirname(service_path) != SCRIPTS_DIR:
            raise Exception(f"Invalid pre-processing script: {service_path}")

        if not os.path.exists(service_path):
            raise Exception("Can not find script file, please check service name")
        try:
            resp = await DiagnosisHelper.run_subprocess_async(
                [service_path, json.dumps(params)]
            )
        except Exception as exc:
            raise Exception(f"Execute preprocess script error: {str(exc)}") from exc

        # 3. If the preprocessing script executes with an error
        if resp["returncode"] != 0:
            raise (Exception(f"Execute preprocess script error: {resp['stderr']}"))

        # 4. If the preprocessing script executes successfully,
        #    take out the processing result
        stdout = resp["stdout"]
        resp = ast.literal_eval(stdout)
        resp_scripts = resp.get("commands")

        # 5. If the preprocessing result not contains 'commands', it's a not expect bug
        if not resp_scripts:
            raise (
                Exception(
                    f"Not find commands, please check the preprocess script return"
                )
            )

        diagnosis_task = DiagnosisTask(
            jobs=[DiagnosisJob.from_dict(item) for item in resp_scripts], in_order=True
        )

        return diagnosis_task

    @staticmethod
    async def preprocess_v2_async(
        instance: JobModel, ignore_check: bool = False
    ) -> Optional[DiagnosisTask]:
        """Pre-processing V2

        Args:
            instance (JobModel): JobModel

        Returns:
            Optional[DiagnosisTask]: Diagnosis task
        """

        def _get_pre_processor(service_name: str) -> Type[DiagnosisPreProcessor]:
            """
            根据要执行的命令，动态引入一个 PreProcessor 的实现用于执行前处理
            """
            try:
                return import_module(f"service_scripts.{service_name}_pre").PreProcessor
            except Exception as e:
                raise Exception(f"No Pre-processor available => {str(e)}")

        # 1. Get params
        service_name = instance.service_name
        params = instance.params.copy()
        if isinstance(instance.params, str):
            try:
                params = json.loads(instance.params)
            except Exception as exc:
                raise Exception(f"Task params loads error: {str(exc)}")

        # 2. Use PreProcessor to check if the version of the tool meets the requirements
        try:
            params.pop("service_name", "")
            pre_processor = _get_pre_processor(service_name)(service_name, **params)
        except Exception as e:
            return None

        # 3. Use PreProcessor to convert params to diagnosis jobs
        diagnosis_task = pre_processor.get_diagnosis_cmds(params)
        if diagnosis_task is None or len(diagnosis_task.jobs) == 0:
            raise Exception(f"Pre-processor not return any diagnosis job")

        # 4. Check whether tool version satisfied current job
        if not diagnosis_task.offline_mode and not ignore_check:
            for job in diagnosis_task.jobs:
                job_params = {
                    "channel_type": params.pop("channel", "ssh"),
                    "channel_opt": "cmd",
                    "params": {
                        **params,
                        "instance": job.instance,
                        "command": pre_processor.get_version_cmd(),
                    },
                    "timeout": 1000 * 60,  # 1 minutes
                }
                job_result = await default_channel_job_executor.dispatch_job(
                    **job_params
                ).execute_async()
                if job_result.code != 0:
                    raise Exception(
                        f"Check tool version for '{instance}' failed: {job_result.err_msg}"
                    )
                if not pre_processor.version_check(job_result.result.strip()):
                    raise Exception(
                        f"Tool version less than {pre_processor.get_min_version_support()}"
                    )

        return diagnosis_task

    @staticmethod
    async def preprocess_async(
        instance: JobModel, ignore_check: bool = False
    ) -> Optional[DiagnosisTask]:
        """ "Perform diagnosis preprocessing
        {
            "commands":[
                {
                    "instance":"xxx",
                    "cmd":"xxx",
                    "params":{ => overide initial param
                        "region":"target_region"
                    }
                }
            ]
        }
        """
        diagnosis_task: Optional[DiagnosisTask] = None
        try:
            diagnosis_task = await DiagnosisHelper.preprocess_v2_async(
                instance, ignore_check
            )
            if diagnosis_task is None:
                diagnosis_task = await DiagnosisHelper.preprocess_v1_async(
                    instance, ignore_check
                )

            # If the pre-processor executes successfully, the parameters are compliant
            # and the Job instance is updated
            await DiagnosisHelper._update_job_async(
                instance, command=diagnosis_task.to_dict()
            )
        except Exception as exc:
            logger.exception(f"Diagnosis preprocess error: {str(exc)}")
            await DiagnosisHelper._update_job_async(
                instance,
                result="Diagnosis preprocess error",
                status="Fail",
                code=1,
                err_msg=f"Diagnosis preprocess error: {str(exc)}",
            )
        return diagnosis_task

    @staticmethod
    async def preprocess_post_wrapper_async(
        instance: JobModel, diagnosis_task: DiagnosisTask
    ) -> bool:
        """Preprocess post wrapper

        Args:
            diagnosis_task (DiagnosisTask): Diagnosis task
            wrapper (Type[DiagnosisPreProcessor]): Diagnosis preprocessor post wrapper
        """

        def _get_pre_processor_post_wrapper(
            wrapper_type: str,
        ) -> Type[DiagnosisPreProcessorPostWrapperBase]:
            try:
                return import_module(
                    f"service_scripts.wrapper.{wrapper_type}"
                ).DiagnosisPreProcessorPostWrapper
            except Exception as e:
                raise Exception(f"No Pre-processor-post-wrapper available => {str(e)}")

        try:
            # 1. Get params
            params = instance.params.copy()
            is_offline = params.get("channel", "") == "offline"
            if isinstance(instance.params, str):
                try:
                    params = json.loads(instance.params)
                except Exception as exc:
                    raise Exception(f"Task params loads error: {str(exc)}")
            if "sysom_preprocess_post_wrapper" not in params:
                return is_offline

            preprocess_post_wrapper = params.pop("sysom_preprocess_post_wrapper")
            wrapper = _get_pre_processor_post_wrapper(preprocess_post_wrapper)()
            wrapper.process(instance.task_id, diagnosis_task)
            await DiagnosisHelper._update_job_async(
                instance, command=diagnosis_task.to_dict()
            )
        except Exception as exc:
            logger.exception(f"Diagnosis preprocess post wrapper error: {str(exc)}")
            await DiagnosisHelper._update_job_async(
                instance,
                result="Diagnosis preprocess post wrapper error",
                status="Fail",
                code=1,
                err_msg=f"Diagnosis preprocess post wrapper error: {str(exc)}",
            )

        return is_offline

    @staticmethod
    async def execute_async(
        instance: JobModel, diagnosis_task: DiagnosisTask
    ) -> DiagnosisTaskResult:
        """Execute diagnosis task"""
        diagnosis_task_result = DiagnosisTaskResult(
            code=1,
            job_results=[],
            err_msg="Diagnosis execute task error",
            in_order=diagnosis_task.in_order,
        )
        try:
            params = instance.params.copy()
            if isinstance(params, str):
                try:
                    params = json.loads(params)
                except Exception as exc:
                    raise Exception(f"Task params loads error: {str(exc)}") from exc

            if diagnosis_task.in_order:
                # Execute diagnosis jobs in order
                for job in diagnosis_task.jobs:
                    job_params = {
                        "channel_type": params.pop("channel", "ssh"),
                        "channel_opt": "cmd",
                        "params": {
                            **params,
                            "instance": job.instance,
                            "command": job.cmd,
                        },
                        "echo": {"task_id": instance.task_id},
                        "timeout": 1000 * 60 * 10,  # 10 minutes
                    }
                    job_result = await default_channel_job_executor.dispatch_job(
                        **job_params
                    ).execute_async()

                    diagnosis_task_result.job_results.append(
                        DiagnosisJobResult(
                            code=job_result.code,
                            err_msg=job_result.err_msg,
                            job=job,
                            stdout=job_result.result,
                        )
                    )

                    if job_result.code != 0:
                        raise Exception(f"Task execute failed: {job_result.err_msg}")
            else:
                # Execute diagnosis jobs in parallel
                tasks = []
                for job in diagnosis_task.jobs:
                    job_params = {
                        "channel_type": params.pop("channel", "ssh"),
                        "channel_opt": "cmd",
                        "params": {
                            **params,
                            "instance": job.instance,
                            "command": job.cmd,
                        },
                        "echo": {"task_id": instance.task_id},
                        "timeout": 1000 * 60 * 10,  # 10 minutes
                    }
                    tasks.append(
                        default_channel_job_executor.dispatch_job(
                            **job_params
                        ).execute_async()
                    )
                job_results = await asyncio.gather(*tasks)
                diagnosis_task_result.job_results = [
                    DiagnosisJobResult(
                        code=job_result.code,
                        err_msg=job_result.err_msg,
                        job=diagnosis_task.jobs[idx],
                        stdout=job_result.result,
                    )
                    for idx, job_result in enumerate(job_results)
                ]
                for job_result in job_results:
                    if job_result.code != 0:
                        raise Exception(f"Task execut failed: {job_result.err_msg}")
            diagnosis_task_result.code = 0
            diagnosis_task_result.err_msg = ""
        except Exception as exc:
            diagnosis_task_result.err_msg = f"Diagnosis execute task error: {str(exc)}"
            logger.exception(diagnosis_task_result.err_msg)
            await DiagnosisHelper._update_job_async(
                instance,
                result="Diagnosis execute task error",
                status="Fail",
                code=1,
                err_msg=diagnosis_task_result.err_msg,
            )
        return diagnosis_task_result

    @staticmethod
    async def postprocess_v1_async(
        instance: JobModel, diagnosis_task_result: DiagnosisTaskResult
    ) -> PostProcessResult:
        service_name = instance.service_name
        # 执行后处理脚本，将结果整理成前端可识别的规范结构
        SCRIPTS_DIR = settings.SCRIPTS_DIR
        service_post_name = service_name + "_post"
        service_post_path = os.path.join(SCRIPTS_DIR, service_post_name)

        # 防止任意命令执行
        if os.path.dirname(service_post_path) != SCRIPTS_DIR:
            raise Exception(f"Invalid post-processing script: {service_post_path}")

        if not os.path.exists(service_post_path):
            raise Exception(
                f"No matching post-processing script found: {service_post_path}"
            )

        # 创建一个临时文件，用于暂存中间结果
        with tempfile.NamedTemporaryFile(mode="w") as tmp_file:
            try:
                # 将要传递的中间结果写入到临时文件当中
                tmp_file.write(
                    "".join([item.stdout for item in diagnosis_task_result.job_results])
                )
                tmp_file.flush()
                resp = await DiagnosisHelper.run_subprocess_async(
                    [service_post_path, tmp_file.name, instance.task_id]
                )
            except Exception as exc:
                raise Exception(f"Execute postprocess script error: {str(exc)}")

            if resp["returncode"] != 0:
                raise (
                    Exception(
                        f"Execute postprocess script error: {str(resp['stderr'])}"
                    )
                )
            result = json.loads(resp["stdout"].strip())
            return PostProcessResult.from_dict(result)

    @staticmethod
    async def postprocess_v2_async(
        instance: JobModel, diagnosis_task_result: DiagnosisTaskResult
    ) -> Optional[PostProcessResult]:
        """Post-processing V2

        Args:
            instance (JobModel): JobModel
            diagnosis_task_result(DiagnosisTaskResult): Diagnosis task result, contain all job results

        Returns:
            Optional[DiagnosisTask]: Diagnosis task
        """

        def _get_post_processor(service_name: str) -> Type[DiagnosisPostProcessor]:
            """
            根据要执行的命令，动态引入一个 PreProcessor 的实现用于执行前处理
            """
            try:
                return import_module(
                    f"service_scripts.{service_name}_post"
                ).PostProcessor
            except Exception as e:
                raise Exception(f"No Pre-processor available => {str(e)}")

        with tempfile.TemporaryDirectory() as tmp_dir:
            # Fetch files from diagnosis task result
            for job_result in diagnosis_task_result.job_results:
                if job_result.job is None:
                    continue
                job_result.file_list = job_result.job.fetch_file_list
                for file_item in job_result.job.fetch_file_list:
                    file_item.local_path = os.path.join(tmp_dir, file_item.name)
                    await default_channel_job_executor.dispatch_file_job(
                        "get-file",
                        {
                            "instance": job_result.job.instance,
                            "local_path": file_item.local_path,
                            "remote_path": file_item.remote_path,
                        },
                    ).execute_async()

            service_name = instance.service_name
            # Use PostProcessor to convert diagnosis result to fron-end format data
            try:
                post_processor = _get_post_processor(service_name)(service_name)
            except Exception as e:
                return None
            return post_processor.parse_diagnosis_result(
                diagnosis_task_result.job_results
            )

    @staticmethod
    async def postprocess_async(
        instance: JobModel, diagnosis_task_result: DiagnosisTaskResult
    ):
        """Perform diagnosis postprocessing
        JobResult -> {
            "code": 0,          => 0 表示成功，1表示失败
            "err_msg": "",      => 如果任务执行失败，错误信息可以使用这个字段获得
            "result": "xxx",    => 命令执行结果
            "echo": {
                "task_id": 0
            }
        }

        Postprocess script response data format =>
        {
            "code": 0,          => 0 表示成功，1表示失败
            "err_msg": "",      => 如果后处理脚本检测到诊断失败，在这边存放诊断错误信息
            "result": {}        => 后处理脚本处理的结果，应该是一个 JSON Object
        }
        """

        code = diagnosis_task_result.code
        err_msg = diagnosis_task_result.err_msg
        if code != 0:
            # Diagnosis task execute failed
            await DiagnosisHelper._update_job_async(
                instance,
                status="Fail",
                code=code,
                result=diagnosis_task_result.job_results[0].stdout,
                err_msg=err_msg,
            )
            return
        post_process_result: Optional[PostProcessResult] = None
        try:
            # 1. 优先尝试使用 V2 方案
            post_process_result = await DiagnosisHelper.postprocess_v2_async(
                instance, diagnosis_task_result
            )
            if post_process_result is None:
                # 2. 没有为目标诊断匹配到 v2 方案，回退尝试 v1 方案
                post_process_result = await DiagnosisHelper.postprocess_v1_async(
                    instance, diagnosis_task_result
                )

            if post_process_result.code != 0:
                # 后处理脚本认为诊断出错
                await DiagnosisHelper._update_job_async(
                    instance, err_msg=post_process_result.err_msg, status="Fail"
                )
            else:
                # 后处理脚本执行成功，更新任务状态
                await DiagnosisHelper._update_job_async(
                    instance, result=post_process_result.result, status="Success"
                )
        except Exception as exc:
            logger.exception(f"Diagnosis postprocess error: {str(exc)}")
            await DiagnosisHelper._update_job_async(
                instance,
                result="Diagnosis postprocess error",
                status="Fail",
                code=1,
                err_msg=f"Diagnosis postprocess error: {str(exc)}",
            )

    @staticmethod
    def invoke_diagnosis_hook(
        instance: JobModel, hook_params: dict
    ) -> HookProcessResult:
        return async_to_sync(DiagnosisHelper.invoke_diagnosis_hook_async)(
            instance, hook_params
        )

    @staticmethod
    async def invoke_diagnosis_hook_async(
        instance: JobModel, hook_params: dict
    ) -> HookProcessResult:
        """Invoke task hook"""

        def _get_diagnosis_hook(service_name: str) -> Type[DiagnosisHookProcessor]:
            try:
                return import_module(
                    f"service_scripts.{service_name}_hook"
                ).HookProcessor
            except Exception as e:
                raise Exception(f"No Diagnosis-Hook-Processor available => {str(e)}")

        res = HookProcessResult(code=1, err_msg="Invoke diagnosis hook error", data={})
        try:
            # 1. Get params
            params = instance.params.copy()
            if isinstance(instance.params, str):
                try:
                    params = json.loads(instance.params)
                except Exception as exc:
                    raise Exception(f"Task params loads error: {str(exc)}")
            service_name = params.get("service_name", "")

            hooker = _get_diagnosis_hook(service_name)(service_name)
            res = await hooker.invoke_hook(instance, hook_params)
        except Exception as exc:
            logger.exception(f"Diagnosis hook invoke error: {str(exc)}")
        return res
